-
Notifications
You must be signed in to change notification settings - Fork 27
[AIT-258] feat: add Realtime mutable message support #660
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds realtime mutable-message support: protocol version bumped to 5, publish now returns server PublishResult, new update/delete/append message flows and REST helpers, ACK handling extended to carry PublishResult lists through ConnectionManager, and tests covering mutable-message behaviors. Changes
Sequence Diagram(s)sequenceDiagram
participant App as App
participant Channel as RealtimeChannel
participant Conn as ConnectionManager
participant WS as WebSocketTransport
participant Server as Server
App->>Channel: publish()/update_message()/delete_message()/append_message()
Channel->>Conn: send_protocol_message(pm, await_ack=True)
Conn->>WS: emit WebSocket frame
WS->>Server: send ProtocolMessage
Server->>WS: ACK (serial, count, optional res[])
WS->>Conn: on_ack(serial, count, res[])
Conn->>Conn: complete_messages(serial, count, res)
Conn->>Channel: resolve pending futures with PublishResult(s)
Channel->>App: return PublishResult / UpdateDeleteResult
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@test/ably/realtime/realtimechannelmutablemessages_test.py`:
- Around line 182-191: The test calls channel.publish with a keyword
(channel.publish(messages=messages)) but the publish implementation inspects
only positional args (checks len(args)), so kwargs are ignored and cause a
ValueError; change the call to pass the messages list positionally
(channel.publish(messages)) so publish receives the messages via args and the
test will exercise publish correctly.
🧹 Nitpick comments (2)
ably/realtime/connectionmanager.py (1)
99-109: Consider handling mismatched lengths between completed messages and results.Using
zip_longestensures all completed messages are processed even whenres_listis shorter or empty. However, ifres_listis longer thancompleted_messages,msgwill beNonefor the extra items, which could cause anAttributeErrorwhen accessingmsg.future.♻️ Suggested fix to guard against None msg
# Default res to empty list if None res_list = res if res is not None else [] for (msg, publish_result) in zip_longest(completed_messages, res_list): + if msg is None: + continue if msg.future and not msg.future.done(): if err: msg.future.set_exception(err)ably/realtime/realtime_channel.py (1)
964-981: Potential ambiguity when bothoptionsandkwargsare provided.The current logic converts
kwargstoChannelOptionsonly whenoptionsisNone. If a user accidentally provides bothoptionsandkwargs, thekwargswill be silently ignored. Consider raising an error or warning in this case.♻️ Suggested fix to warn about ignored kwargs
# Convert kwargs to ChannelOptions if provided if kwargs and not options: options = ChannelOptions(**kwargs) + elif kwargs and options: + log.warning("Channels.get(): kwargs ignored when options is provided") elif options and isinstance(options, dict): options = ChannelOptions.from_dict(options)
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Jira integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
ably/realtime/connectionmanager.pyably/realtime/realtime_channel.pyably/transport/defaults.pyably/transport/websockettransport.pytest/ably/realtime/realtimechannelmutablemessages_test.py
🧰 Additional context used
🧬 Code graph analysis (2)
ably/transport/websockettransport.py (2)
ably/realtime/connectionmanager.py (3)
ably(780-781)on_ack(468-479)count(59-61)ably/types/operations.py (1)
PublishResult(48-67)
ably/realtime/connectionmanager.py (2)
ably/types/operations.py (1)
PublishResult(48-67)ably/util/exceptions.py (1)
AblyException(9-90)
🪛 Gitleaks (8.30.0)
test/ably/realtime/realtimechannelmutablemessages_test.py
[high] 158-158: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
(generic-api-key)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: check (3.14)
- GitHub Check: check (3.12)
- GitHub Check: check (3.11)
- GitHub Check: check (3.8)
- GitHub Check: check (3.13)
- GitHub Check: check (3.10)
- GitHub Check: check (3.9)
🔇 Additional comments (19)
ably/transport/websockettransport.py (2)
15-15: LGTM!Import of
PublishResultis correctly added to support the new ACK handling flow.
176-179: LGTM!The ACK handling correctly extracts the optional
resfield, converts each item to aPublishResultobject when present, and passes it to the connection manager. The null-safety check before conversion is appropriate.ably/realtime/connectionmanager.py (7)
7-7: LGTM!Import of
zip_longestis appropriate for aligning completed messages with their correspondingPublishResultentries.
34-34: LGTM!Updated type annotation correctly reflects that the future now resolves to a
PublishResultinstead ofNone.
63-78: LGTM!The updated signature correctly accepts the
resparameter and the docstring accurately describes the new behavior.
216-253: LGTM!The
send_protocol_messagemethod correctly returns thePublishResultwhen awaiting ACK-required messages andNoneotherwise. The flow handles both queued and immediate sends appropriately.
255-273: LGTM!The
_send_protocol_message_on_connected_statemethod correctly propagates thePublishResultfrom the future when ACK is required.
468-479: LGTM!The
on_ackmethod correctly accepts and forwards theresparameter tocomplete_messages.
493-493: LGTM!Correctly passes
Noneforresand the error tocomplete_messagesfor NACK handling.ably/realtime/realtime_channel.py (5)
13-15: LGTM!Imports are correctly added to support the new mutable message operations.
394-494: LGTM!The
publishmethod correctly returns thePublishResultfrom the connection manager, enabling callers to access message serials for subsequent update/delete operations.
526-606: Consider using MessageAction enum for action.name in log statement.The
_send_updateimplementation correctly validates serial presence, constructs the message with version metadata, handles encryption, and returns theUpdateDeleteResult. The logic is sound.One minor observation: Line 588 uses
action.namein the log, which assumesMessageActionis an enum with anameattribute - this should work correctly with Python enums.
608-672: LGTM!The public convenience methods
update_message,delete_message, andappend_messageare well-documented and correctly delegate to_send_updatewith the appropriateMessageActionvalues.
674-718: LGTM!The REST delegation methods correctly call the parent
Channelmethods, providing a unified API for accessing messages via the REST API from the realtime channel.test/ably/realtime/realtimechannelmutablemessages_test.py (4)
15-23: LGTM!Good use of parametrization to test both JSON and MsgPack transports. The fixture properly configures the Ably client based on the transport type.
154-176: Test key is appropriate for testing - not a real secret.The static analysis tool flagged
'keyfordecrypt_16'as a potential API key, but this is a test encryption key used only in unit tests and is not a real secret. This is a false positive.
266-278: LGTM!The helper method
wait_until_message_with_action_appearscorrectly uses polling to wait for a message with a specific action, with proper exception handling.
280-289: LGTM!The helper method
wait_until_get_all_message_versioncorrectly polls for version history until the expected count is reached.ably/transport/defaults.py (1)
2-2: Protocol version 5 is properly supported and already tested.Ably Protocol v5 is officially supported with comprehensive test coverage for mutable messages (updates, deletes, appends) in both REST and realtime channels. The integration in
connectionmanager.pycorrectly passes the protocol version, and the test infrastructure includes the required mutable messages support.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
8ff2552 to
393693c
Compare
7c26134 to
d304d0a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
test/ably/realtime/realtimechannel_publish_test.py (1)
456-458: Assertion may fail after the PublishResult changes.With the updated
complete_messagesimplementation, whenresisNone, the future is resolved with an emptyPublishResult()object, notNone. This assertion should be updated.🔧 Suggested fix
# Future should be resolved result = await asyncio.wait_for(publish_future, timeout=1) - assert result is None + assert result is not None + assert result.serials == []
♻️ Duplicate comments (1)
test/ably/realtime/realtimechannelmutablemessages_test.py (1)
182-191: Thechannel.publish(messages=messages)call is incorrect.The
publishmethod only processes positional arguments vialen(args). Calling withmessages=messagesas a keyword argument will result in an emptyargstuple, causing aValueError.🔧 Suggested fix
- result = await channel.publish(messages=messages) + result = await channel.publish(messages)
c921aa0 to
1723f5d
Compare
d304d0a to
e843d08
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@ably/realtime/connectionmanager.py`:
- Around line 99-109: The loop using zip_longest between completed_messages and
res_list can yield msg==None when res_list is longer, which causes accesses like
msg.future to raise; update the loop in connectionmanager.py (where zip_longest
is used) to either iterate directly over completed_messages and index into
res_list (e.g., for i, msg in enumerate(completed_messages): publish_result =
res_list[i] if i < len(res_list) else None) or add an explicit guard (if msg is
None: continue) before touching msg.future, and keep the existing logic that
substitutes PublishResult() when publish_result is None.
In `@ably/realtime/realtime_channel.py`:
- Around line 597-603: The code builds a protocol_message using params.items()
but params can be None; update the method in realtime_channel (the block
constructing protocol_message with ProtocolMessageAction.MESSAGE, self.name and
messages [msg_dict]) to guard against None by treating params as an empty dict
when not passed (e.g., use params = params or {} or a conditional variable)
before the dict comprehension, and then apply the existing {k: str(v).lower() if
type(v) is bool else v for k, v in params.items()} logic so update/delete/append
calls that omit params do not raise.
♻️ Duplicate comments (1)
test/ably/realtime/realtimechannelmutablemessages_test.py (1)
182-189:publish(messages=messages)will raise; pass list positionally instead.
publish()only inspects*args, so kwargs are ignored and cause a ValueError.🛠️ Suggested fix
- result = await channel.publish(messages=messages) + result = await channel.publish(messages)
🧹 Nitpick comments (1)
ably/realtime/connectionmanager.py (1)
216-223: Update the docstring return type.
send_protocol_messagenow returnsPublishResult | None, but the docstring still statesNone.
e843d08 to
f548267
Compare
f548267 to
995935b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
ably/realtime/connectionmanager.py (1)
216-223: Docstring return type is inconsistent with signature.The docstring states
Returns: Nonebut the function signature and implementation returnPublishResult | None.📝 Suggested fix
"""Send a protocol message and optionally track it for acknowledgment Args: protocol_message: protocol message dict (new message) Returns: - None + PublishResult if the message requires acknowledgment, None otherwise """
♻️ Duplicate comments (1)
ably/realtime/connectionmanager.py (1)
99-109:zip_longestcan still producemsg=Nonewhenres_listis longer.The previous review flagged this issue. While the server should not send more results than messages, defensive coding would prevent a potential
AttributeErrorif it does. The suggested fix was to iterate overcompleted_messagesand index intores_list:🛠️ Suggested defensive fix
- res_list = res if res is not None else [] - for (msg, publish_result) in zip_longest(completed_messages, res_list): + res_list = res or [] + for idx, msg in enumerate(completed_messages): + publish_result = res_list[idx] if idx < len(res_list) else None if msg.future and not msg.future.done():
🧹 Nitpick comments (2)
test/ably/realtime/realtimechannelmutablemessages_test.py (2)
2-2: Consider using built-inlistinstead oftyping.List.Python 3.9+ supports using
listdirectly in type annotations. TheListimport fromtypingis unnecessary if the project targets Python 3.9+.📝 Suggested fix
import logging -from typing import List import pytestAnd at line 281:
- versions: List[Message] = [] + versions: list[Message] = []
266-278: Consider catching specific exceptions in helper method.The bare
except Exceptionat line 273 catches all exceptions, which could mask unexpected errors during testing. Consider catching the specific exceptions expected (e.g.,AblyExceptionfor not-found cases).📝 Suggested fix
async def check_message_action(): nonlocal message try: message = await channel.get_message(serial) return message.action == action - except Exception: + except AblyException: return False
- Updated `ConnectionManager` and `MessageQueue` to process `PublishResult` during acknowledgments (ACK/NACK). - Extended `send_protocol_message` to return `PublishResult` for publish tracking. - Bumped default `protocol_version` to 5. - Added tests for message update, delete, append operations, and PublishResult handling.
995935b to
0b93c10
Compare
owenpearson
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm 👍
ConnectionManagerandMessageQueueto processPublishResultduring acknowledgments (ACK/NACK).send_protocol_messageto returnPublishResultfor publish tracking.protocol_versionto 5.Summary by CodeRabbit
New Features
Chores
Tests
✏️ Tip: You can customize this high-level summary in your review settings.